將字串資料讀入rdd
scala> val numbers=sc.parallelize(List("1,2,3,4,5,1,2,2,3,4,4,5,6"))
numbers: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> numbers.collect
res6: Array[String] = Array(1,2,3,4,5,1,2,2,3,4,4,5,6)
使用flatMap語法將資料轉換成數個rdd
我們在此使用的flatMap主要的原因,是想傳回List型態的資料並且轉換數筆資料,而時常聽到的Map則是回傳一個數值當作一筆資料
scala> val split = numbers.flatMap(x=>x.toString.split(","))
split: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at flatMap at <console>:26
scala> split.collect
res7: Array[String] = Array(1, 2, 3, 4, 5, 1, 2, 2, 3, 4, 4, 5, 6)
使用filter語法將資料篩選,若是回傳為true則保留資料,false則是捨去
scala> val filter = split.filter(x=>x!="6")
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:28
scala> filter.collect
res8: Array[String] = Array(1, 2, 3, 4, 5, 1, 2, 2, 3, 4, 4, 5)
使用map語法將資料轉換成key value格式,因為再處理資料時需要針對某些資料做整合或是分群的操作,所以此時就需要把資料轉換成key value的格式,好指定想要操作的資料
scala> val keyValue = filter.map(x=>(x,1))
keyValue: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:30
scala> keyValue.collect
res12: Array[(String, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (1,1), (2,1), (2,1), (3,1), (4,1), (4,1), (5,1))
使用reduceByKey語法將value累加,在範例中x代表著上筆處理完的資料,y代表當下讀入的資料,這樣講可能有點抽象,以此資料為例(1,1), (1,2), (1,3), (1,1)使用((x,y)=>x+y)方法,流程如下
第一次x=1, y=2
第二次x=3, y=6
第三次x=6, y=7
那最終回傳之值便是7
scala> val result = keyValue.reduceByKey((x,y)=>x+y)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[12] at reduceByKey at <console>:32
scala> result.collect
res13: Array[(String, Int)] = Array((1,2), (2,3), (3,2), (4,3), (5,2))
以上就是將各個key累加之結果